package com.example.service.impl;

import cn.hutool.core.bean.BeanUtil;
import com.example.entity.PriceData;
import com.example.util.TimeConstant;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.io.IOException;
import java.text.DecimalFormat;
import java.time.LocalDate;
import java.util.*;

import static com.example.util.HBaseTableNameConstant.*;

/**
 * @author xqz
 * @description TODO
 * @dateTime 2023/5/16 21:18
 */
@Service
public class HBaseServicePlus {

    @Autowired
    private Connection hbaseConnection;

    private void createTableIfNotExists(TableName tableName, String columnName) throws IOException {
        try (Admin admin = hbaseConnection.getAdmin()) {
            if (!admin.tableExists(tableName)) {
                HTableDescriptor tableDescriptor = new HTableDescriptor(tableName);
                tableDescriptor.addFamily(new HColumnDescriptor(columnName));
                admin.createTable(tableDescriptor);
            }
        }
    }

    /**
     * 插入某省份某年某月的平均价格
     * @author xqz
     * @apiNote 往HBse插入数据, 插入的数据为省份,年份,月份以及平均价格
     * @dateTime 2023/5/17 10:23
     * @param province 省份
     * @param year 年份
     * @param month 月份
     * @param averagePrice 平均价格
     */
    public void putHBaseData(String province, String year, String month, String averagePrice) throws IOException {
        TableName tableName = TableName.valueOf(AVG_PRICE_DATA_OF_MONTHS_BY_YEAR_AND_PROVINCE_TABLE);
        // 判断是否需要创建表
        createTableIfNotExists(tableName, AVG_PRICE_DATA_OF_MONTHS_BY_YEAR_AND_PROVINCE_TABLE_COLUMN_NAME);
        // 获取到'PriceData'表的句柄
        Table table = hbaseConnection.getTable(tableName);

        // 创建一个'Put'操作，行键为 "province_year_month"
        String rowKey = province + "_" + year + "_" + month;
        Put put = new Put(Bytes.toBytes(rowKey));

        // 将数据插入到'info'列族中的相应列
        put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("province"), Bytes.toBytes(province));
        put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("year"), Bytes.toBytes(year));
        put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("month"), Bytes.toBytes(month));
        put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("average_price"), Bytes.toBytes(averagePrice));

        // 在表中执行'Put'操作
        table.put(put);

        // 关闭表连接
        table.close();
    }

    /**
     * 获取某个省份某年某月的平均价格
     * @author xqz
     * @apiNote 获取HBase中的数据,该数据是某个省份某一年某一月份的平均价格
     * @dateTime 2023/5/17 10:06
     * @param province 省份
     * @param year 年份
     * @param month 月份
     * @return PriceData 价格数据对象
     */
    public PriceData getHBaseData(String province, String year, String month) throws IOException {
        TableName tableName = TableName.valueOf(AVG_PRICE_DATA_OF_MONTHS_BY_YEAR_AND_PROVINCE_TABLE);
        // 获取到'PriceData'表的句柄
        Table table = hbaseConnection.getTable(tableName);
        // 创建一个'Get'操作，获取键为 "province_year_month"的行
        String rowKey = province + "_" + year + "_" + month;
        Get get = new Get(Bytes.toBytes(rowKey));

        // 在表中执行'Get'操作
        Result result = table.get(get);

        // 从'Result'对象中获取'info'列族中各列的值
        byte[] provinceBytes = result.getValue(Bytes.toBytes("info"), Bytes.toBytes("province"));
        byte[] yearBytes = result.getValue(Bytes.toBytes("info"), Bytes.toBytes("year"));
        byte[] monthBytes = result.getValue(Bytes.toBytes("info"), Bytes.toBytes("month"));
        byte[] priceBytes = result.getValue(Bytes.toBytes("info"), Bytes.toBytes("average_price"));

        String provinceValue = Bytes.toString(provinceBytes);
        String yearValue = Bytes.toString(yearBytes);
        String monthValue = Bytes.toString(monthBytes);
        String priceValue = Bytes.toString(priceBytes);

        if (monthValue == null || monthValue.isEmpty()) {
            monthValue = month;
            priceValue = "0";
        }

        // 关闭表连接
        table.close();
        // 创建 DecimalFormat 对象，指定保留两位小数的格式
        DecimalFormat decimalFormat = new DecimalFormat("#.##");
        // 创建 PriceData 对象并添加到列表中
        String valueFormatted = decimalFormat.format(Double.valueOf(priceValue));  // 格式化 value 的值，保留两位小数
        // 返回各列的值
        return new PriceData(provinceValue, yearValue, monthValue, Double.valueOf(valueFormatted));
        // return "Province: " + provinceValue + ", Year: " + yearValue + ", Month: " + monthValue + ", Average Price: " + priceValue;
    }

    /**
     * 插入某年某省份的平均价格
     * @author xqz
     * @apiNote 往AvgPriceOfYearData表插入数据, 插入的数据为年份, 省份以及平均价格
     * @dateTime 2023/5/17 10:23
     * @param year 年份
     * @param province 省份
     * @param averagePrice 平均价格
     */
    public void putHBaseData(String province, String year, String averagePrice) throws IOException {
        TableName tableName = TableName.valueOf(AVG_PRICE_DATA_OF_PROVINCES_BY_YEAR_TABLE);
        // 判断是否需要创建表
        createTableIfNotExists(tableName, AVG_PRICE_DATA_OF_PROVINCES_BY_YEAR_TABLE_COLUMN_NAME);
        // 获取到表的句柄
        Table table = hbaseConnection.getTable(tableName);

        // 创建一个'Put'操作，行键为 "year"
        String rowKey = year;
        Put put = new Put(Bytes.toBytes(rowKey));

        // 将数据插入到'province'列族中的相应列
        put.addColumn(Bytes.toBytes(AVG_PRICE_DATA_OF_PROVINCES_BY_YEAR_TABLE_COLUMN_NAME), Bytes.toBytes(province), Bytes.toBytes(averagePrice));

        // 在表中执行'Put'操作
        table.put(put);

        // 关闭表连接
        table.close();
    }

    /**
     * 获取某年某个省份的平均价格
     * @author xqz
     * @apiNote 获取HBase中的数据,该数据是某一年某个省份的平均价格
     * @dateTime 2023/5/17 10:06
     * @param year 年份
     * @param province 省份
     * @return PriceData 价格数据对象
     */
    public PriceData getHBaseData(String province, String year) throws IOException {
        TableName tableName = TableName.valueOf(AVG_PRICE_DATA_OF_PROVINCES_BY_YEAR_TABLE);
        // 获取到表的句柄
        Table table = hbaseConnection.getTable(tableName);
        // 创建一个'Get'操作，获取键为 "year"的行
        String rowKey = year;
        Get get = new Get(Bytes.toBytes(rowKey));

        // 在表中执行'Get'操作
        Result result = table.get(get);

        // 从'Result'对象中获取'province'列族中的省份列的值
        byte[] priceBytes = result.getValue(Bytes.toBytes(AVG_PRICE_DATA_OF_PROVINCES_BY_YEAR_TABLE_COLUMN_NAME), Bytes.toBytes(province));

        String priceValue = Bytes.toString(priceBytes);

        // 关闭表连接
        table.close();

        if(priceValue == null || priceValue.isEmpty()) {
            System.out.println("priceValue = " + priceValue);
            return null;
        }
        // 创建 DecimalFormat 对象，指定保留两位小数的格式
        DecimalFormat decimalFormat = new DecimalFormat("#.##");
        // 创建 PriceData 对象并添加到列表中
        String valueFormatted = decimalFormat.format(Double.valueOf(priceValue));  // 格式化 value 的值，保留两位小数
        // 返回各列的值
        return new PriceData(province, year, TimeConstant.DEFAULT_MONTH, Double.valueOf(valueFormatted));
    }

    /**
     * 获取某年所有省份的价格数据
     * @author xqz
     * @apiNote 获取HBase中的数据,该数据是某一年所有省份的所有价格数据
     * @dateTime 2023/5/17 10:06
     * @param rowKey 行键
     * @return List<PriceData> 价格数据对象列表
     */
    public List<PriceData> getAllYearOfProvinceData(String rowKey) throws IOException {
        TableName tableName = TableName.valueOf(AVG_PRICE_DATA_OF_PROVINCES_BY_YEAR_TABLE);
        // 获取到表的句柄
        Table table = hbaseConnection.getTable(tableName);
        // 创建一个'Get'操作，获取键为 "rowKey"的行
        Get get = new Get(Bytes.toBytes(rowKey));

        // 在表中执行'Get'操作
        Result result = table.get(get);

        List<PriceData> priceDataList = new ArrayList<>();
        double minValue = Double.MAX_VALUE;  // 初始化最小值为最大可能值
        double maxValue = Double.MIN_VALUE;  // 初始化最大值为最小可能值

        // 迭代列
        for (Cell cell : result.rawCells()) {
            String family = Bytes.toString(CellUtil.cloneFamily(cell));
            String column = Bytes.toString(CellUtil.cloneQualifier(cell));
            String value = Bytes.toString(CellUtil.cloneValue(cell));

            // 创建 DecimalFormat 对象，指定保留两位小数的格式
            DecimalFormat decimalFormat = new DecimalFormat("#.##");
            // 创建 PriceData 对象并添加到列表中
            String valueFormatted = decimalFormat.format(Double.valueOf(value));  // 格式化 value 的值，保留两位小数
            double parsedValue = Double.valueOf(valueFormatted);

            if (parsedValue < minValue) {
                minValue = parsedValue;  // 更新最小值
            }
            if (parsedValue > maxValue) {
                maxValue = parsedValue;  // 更新最大值
            }

            PriceData priceData = new PriceData(column, rowKey, TimeConstant.DEFAULT_MONTH, parsedValue);
            priceDataList.add(priceData);
        }

        // 关闭表连接
        table.close();

        // 创建包含最大值和最小值的 PriceData 对象，并添加到列表的开头
        priceDataList.add(0, new PriceData("Max Value", rowKey, TimeConstant.DEFAULT_MONTH, maxValue));
        priceDataList.add(0, new PriceData("Min Value", rowKey, TimeConstant.DEFAULT_MONTH, minValue));

        // 返回所有列的值
        return priceDataList;
    }

    /**
     *
     * @author xqz
     * @apiNote 存储数据, 供动态柱形图使用
     * @dateTime 2023/5/21 22:10
     * @param province 省份
     * @param year 年份
     * @param month 月份
     * @param averagePrice 平均价格
     */
    public void putDataByDynamicBarChart(String province, String year, String month, String averagePrice) throws IOException {
        TableName tableName = TableName.valueOf(AVG_PRICE_DATA_OF_PROVINCES_AND_MONTHS_BY_YEAR_TABLE);
        // 判断是否需要创建表
        createTableIfNotExists(tableName, AVG_PRICE_DATA_OF_PROVINCES_AND_MONTHS_BY_YEAR_TABLE_COLUMN_NAME);
        Table table = hbaseConnection.getTable(tableName);
        String rowKey = year + "_" + province + "_" + month;
        Put put = new Put(Bytes.toBytes(rowKey));
        put.addColumn(Bytes.toBytes(AVG_PRICE_DATA_OF_PROVINCES_AND_MONTHS_BY_YEAR_TABLE_COLUMN_NAME), Bytes.toBytes("average_price"), Bytes.toBytes(averagePrice));
        table.put(put);
        table.close();
    }

    /**
     * 获取某年的->所有省份所有月份的数据
     * @author xqz
     * @apiNote 获取数据供动态柱形图使用, 获取某年的->所有省份所有月份的数据, 返回Map集合,key是省份,value是月份的Map集合
     * @dateTime 2023/5/21 22:12
     * @param year 年份
     */
    public Map<String, Map<String, Double>> getAllProvinceMonthDataByYear(String year) throws IOException {
        Map<String, Map<String, Double>> yearData = new HashMap<>();
        TableName tableName = TableName.valueOf(AVG_PRICE_DATA_OF_PROVINCES_AND_MONTHS_BY_YEAR_TABLE);
        Table table = hbaseConnection.getTable(tableName);

        Scan scan = new Scan().withStartRow(Bytes.toBytes(year)).withStopRow(Bytes.toBytes(String.valueOf(Integer.parseInt(year) + 1)));
        ResultScanner scanner = table.getScanner(scan);
        for (Result result = scanner.next(); result != null; result = scanner.next()) {
            String rowKey = Bytes.toString(result.getRow());
            String[] parts = rowKey.split("_");
            String province = parts[1];
            String month = parts[2];
            byte[] priceBytes = result.getValue(Bytes.toBytes("info"), Bytes.toBytes("average_price"));
            String priceValue = Bytes.toString(priceBytes);
            double averagePrice = Double.parseDouble(priceValue);
            yearData.computeIfAbsent(month, k -> new HashMap<>()).put(province, averagePrice);
        }
        scanner.close();
        table.close();

        return yearData;
    }

    public List<PriceData> getHBaseDataForMonth(String province, String year, String month) throws IOException {
        TableName tableName = TableName.valueOf("agriculture_price_table");
        Table table = hbaseConnection.getTable(tableName);

        String startRowKey = year + "-" + month + "-01";
        String endRowKey = year + "-" + month + "-31";

        Scan scan = new Scan().withStartRow(Bytes.toBytes(startRowKey))
                .withStopRow(Bytes.toBytes(endRowKey))
                .addColumn(Bytes.toBytes("price"), Bytes.toBytes(province + "_avgPrice"));

        Scan scan2 = new Scan().withStartRow(Bytes.toBytes(startRowKey))
                .withStopRow(Bytes.toBytes(endRowKey))
                .addColumn(Bytes.toBytes("price"), Bytes.toBytes(province + "_highPrice"));

        ResultScanner scanner = table.getScanner(scan);
        ResultScanner scanner2 = table.getScanner(scan2);

        List<PriceData> priceDataList = new ArrayList<>();

        for (Result result : scanner) {
            byte[] rowKeyBytes = result.getRow();
            byte[] avgPriceBytes = result.getValue(Bytes.toBytes("price"), Bytes.toBytes(province + "_avgPrice"));

            String rowKey = Bytes.toString(rowKeyBytes);
            String[] rowKeyParts = rowKey.split("-");
            String day = rowKeyParts[2];   // 取最后一部分为 day

            String avgPriceStr = Bytes.toString(avgPriceBytes);
            double avgPrice = Double.parseDouble(avgPriceStr);

            PriceData priceData = new PriceData(province, year, month, avgPrice,0d, day);
            priceDataList.add(priceData);
        }

        List<PriceData> priceDataList2 = new ArrayList<>();
        for (Result result : scanner2) {
            byte[] rowKeyBytes = result.getRow();
            byte[] maxPriceBytes = result.getValue(Bytes.toBytes("price"), Bytes.toBytes(province + "_highPrice"));

            String rowKey = Bytes.toString(rowKeyBytes);
            String[] rowKeyParts = rowKey.split("-");
            String day = rowKeyParts[2];   // 取最后一部分为 day

            String maxPriceStr = Bytes.toString(maxPriceBytes);
            double maxPrice = Double.parseDouble(maxPriceStr);

            PriceData priceData = new PriceData(province, year, month,0d , maxPrice, day);
            priceDataList2.add(priceData);
        }

        // 遍历 priceDataList2
        for (PriceData priceData2 : priceDataList2) {
            String day2 = priceData2.getDay();

            // 在 priceDataList 中查找相同日期
            for (PriceData priceData : priceDataList) {
                if (priceData.getDay().equals(day2)) {
                    // 将 maxPrice 复制到 priceDataList 对象中
                    priceData.setMaxPrice(priceData2.getMaxPrice());
                    break;
                }
            }
        }
        table.close();
        return priceDataList;
    }

    private int getLastDayOfMonth(String year, String month) {
        int yearInt = Integer.parseInt(year);
        int monthInt = Integer.parseInt(month);
        LocalDate date = LocalDate.of(yearInt, monthInt, 1);
        return date.lengthOfMonth();
    }
}

